#!/usr/bin/python3

# This file is part of Cockpit.
#
# Copyright (C) 2013 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <http://www.gnu.org/licenses/>.

import re
import subprocess

import parent
from testlib import *


def kill_user_admin(machine):
    # logind from systemd 208 is buggy, so we use systemd directly if it fails
    # https://bugs.freedesktop.org/show_bug.cgi?id=71092
    machine.execute("loginctl terminate-user admin || systemctl kill user-1000.slice")


def authorize_user(m, user, public_keys=["verify/files/ssh/id_rsa.pub"]):
    m.execute(f"mkdir -p /home/{user}/.ssh")
    m.upload(public_keys, f"/home/{user}/.ssh/authorized_keys")
    m.execute("chown -R {0}:{0} /home/{0}/.ssh/".format(user))
    m.execute(f"chmod 600 /home/{user}/.ssh/authorized_keys")
    m.execute(f"chmod 700 /home/{user}/.ssh")


LOAD_KEYS = [
    "id_rsa",  # password: foobar
    "id_ecdsa",  # no password
    "id_dsa",  # password: badbad
    "id_ed25519",  # password: locked
]

KEY_IDS = [
    "2048 SHA256:SRvBhCmkCEVnJ6ascVH0AoVEbS3nPbowZkNevJnXtgw /home/admin/.ssh/id_rsa (RSA)",
    "256 SHA256:dyHF4jiKz6RolQqORIATqhbZ4kil5cyiMQWizbQWU8k /home/admin/.ssh/id_ecdsa (ECDSA)",
    "256 SHA256:Wd028KYmG3OVLp7dBmdx0gMR7VcarJVIfaTtKqYCmak /home/admin/.ssh/id_ed25519 (ED25519)"
]

KEY_IDS_MD5 = [
    "2048 93:40:9e:67:82:78:a8:99:89:39:d5:ba:e0:50:70:e1 /home/admin/.ssh/id_rsa (RSA)",
    "256 bd:56:df:c3:ff:e4:1d:9d:f5:c4:b9:cc:64:00:d5:93 /home/admin/.ssh/id_ecdsa (ECDSA)",
    "256 b5:80:1b:f5:98:89:2a:39:f3:78:b3:64:5c:64:33:17 /home/admin/.ssh/id_ed25519 (ED25519)",
]


@skipImage("TODO: ssh key check fails on Arch Linux", "arch")
@skipDistroPackage()
class TestMultiMachineKeyAuth(MachineCase):
    provision = {
        "machine1": {"address": "10.111.113.1/20"},
        "machine2": {"address": "10.111.113.2/20"},
    }

    def load_key(self, name, password):
        self.browser.switch_to_top()
        self.browser.eval_js("loaded = false")
        self.browser.eval_js("""
            load = function (user) {{
                const proc = cockpit.spawn([ 'ssh-add', '{0}' ], {{ pty: true, directory: user.home + '/.ssh' }});
                proc.stream(data => {{
                        if (data.indexOf('passphrase') !== -1)
                            proc.input('{1}\\n', true);
                        console.log(data);
                    }})
                    .then(() => {{
                        loaded = true;
                    }})
                    .catch(ex => {{
                        console.error(JSON.stringify(ex));
                    }});
            }}
        """.format(name, password))
        self.browser.eval_js("cockpit.user().done(load)")
        self.browser.wait_js_cond('loaded === true')

    def check_keys(self, keys_md5, keys):
        def normalize(k):
            return re.sub("/home/admin/\\.ssh/[^ ]*|test@test|ecdsa w/o comment", "", k)
        self.assertIn(normalize(self.browser.eval_js("cockpit.spawn([ 'ssh-add', '-l' ])")),
                      [normalize("\n".join(keys_md5) + "\n"),
                       normalize("\n".join(keys) + "\n")])

    def setUp(self):
        super().setUp()
        self.machine2 = self.machines['machine2']

        # Add user
        self.machine2.disconnect()
        self.machine2.execute("useradd user -c 'User' || true", direct=True)
        self.machine2.execute(
            "sed -i 's/.*PasswordAuthentication.*/PasswordAuthentication no/' /etc/ssh/sshd_config $(ls /etc/ssh/sshd_config.d/* 2>/dev/null || true)", direct=True)
        # HACK - Increase MaxAuthTries because of
        # https://bugs.libssh.org/T233 and because we have a lot of
        # keys and cockpit-ssh tries them all, and many of them twice.
        self.machine2.execute("echo 'MaxAuthTries 100' >>/etc/ssh/sshd_config")
        self.machine2.execute(
            "( ! systemctl is-active sshd.socket || systemctl stop sshd.socket) && systemctl restart sshd.service", direct=True)
        self.machine2.wait_execute()

        # Disable preloading on all machines ("machine1" is done in testlib.py)
        # Preloading on machines with debug build can overload the browser and cause slowness and browser crashes
        # In these tests we actually switch between machines in quick succession which can make things even worse
        if self.is_devel_build():
            self.machines["machine2"].write("/usr/share/cockpit/packagekit/override.json", '{ "preload": [ ] }')
            self.machines["machine2"].write("/usr/share/cockpit/systemd/override.json", '{ "preload": [ ] }')

    # Possible workaround - ssh as `admin` and just do `m.execute()`
    @skipBrowser("Firefox cannot do `cockpit.spawn`", "firefox")
    def testBasic(self):
        b = self.browser
        m1 = self.machine
        m2 = self.machine2

        # Load keys
        m1.execute("mkdir -p /home/admin/.ssh")

        m1.upload([f"verify/files/ssh/{k}" for k in LOAD_KEYS],
                  "/home/admin/.ssh/")
        m1.upload([f"verify/files/ssh/{k}.pub" for k in LOAD_KEYS],
                  "/home/admin/.ssh/")
        m1.execute("chmod 400 /home/admin/.ssh/*")
        m1.execute("chown -R admin:admin /home/admin/.ssh")

        self.login_and_go("/system")

        try:
            m1.execute("ps xa | grep ssh-agent | grep -v grep")
        except subprocess.CalledProcessError:
            assert False, "No running ssh-agent found"

        # pam-ssh-add isn't used on OSTree
        if m1.ostree_image:
            self.load_key('id_rsa', 'foobar')
            self.check_keys(["2048 93:40:9e:67:82:78:a8:99:89:39:d5:ba:e0:50:70:e1 id_rsa (RSA)"],
                            ["2048 SHA256:SRvBhCmkCEVnJ6ascVH0AoVEbS3nPbowZkNevJnXtgw id_rsa (RSA)"])
        else:
            # Check our keys were loaded.
            self.load_key("id_ed25519", "locked")
            self.check_keys(KEY_IDS_MD5, KEY_IDS)

        # Add machine
        b.switch_to_top()
        b.go("/@10.111.113.2")
        b.click('#machine-troubleshoot')
        b.wait_visible('#hosts_setup_server_dialog')

        b.wait_text(f'#hosts_setup_server_dialog {self.primary_btn_class}', "Add")
        b.click(f'#hosts_setup_server_dialog {self.primary_btn_class}')
        b.wait_in_text('#hosts_setup_server_dialog', "You are connecting to 10.111.113.2 for the first time.")
        b.click(f'#hosts_setup_server_dialog {self.primary_btn_class}')
        b.wait_in_text('#hosts_setup_server_dialog h1', "Log in to")
        b.wait_in_text('#hosts_setup_server_dialog', "accept password login")
        b.click("#hosts_setup_server_dialog button:contains('Cancel')")
        b.wait_not_present('#hosts_setup_server_dialog')

        # add key
        authorize_user(m2, "admin")

        # Login
        b.click('#machine-troubleshoot')
        b.wait_visible('#hosts_setup_server_dialog')
        b.wait_text(f'#hosts_setup_server_dialog {self.primary_btn_class}', "Add")
        b.click(f'#hosts_setup_server_dialog {self.primary_btn_class}')
        b.wait_not_present('#hosts_setup_server_dialog')
        b.enter_page("/system", host="10.111.113.2")

        # Logout
        b.logout()
        b.wait_visible("#login")

        # Make sure ssh-agent exits
        wait(lambda: "ssh-agent" not in m1.execute("ps xa | grep ss[h]-agent || true"))

        self.login_and_go("/system")
        b.switch_to_top()
        # pam-ssh-add isn't used on OSTree
        if m1.ostree_image:
            self.load_key('id_rsa', 'foobar')

        b.go("/@10.111.113.2")
        b.wait_visible("iframe.container-frame[name='cockpit1:10.111.113.2/system']")

        # Change user
        authorize_user(m2, "user")
        m2.execute("rm /home/admin/.ssh/authorized_keys")

        b.click("#hosts-sel button")
        b.click("button:contains('Edit hosts')")
        b.click("#nav-hosts .nav-item a[href='/@10.111.113.2'] + span button.nav-action.pf-m-secondary")

        b.wait_visible('#hosts_setup_server_dialog')
        b.wait_visible('#add-machine-user')
        self.assertEqual(b.val("#add-machine-user"), "")
        b.set_input_text('#add-machine-user', 'user')
        b.click("#hosts_setup_server_dialog .pf-m-primary")
        b.wait_not_present('#hosts_setup_server_dialog')

        # We now expect this iframe to disappear
        b.wait_not_present("iframe.container-frame[name='cockpit1:10.111.113.2/system']")

        # And then we expect it to be reloaded after clicking through
        b.wait_visible("a[href='/@10.111.113.2']")
        b.enter_page("/system", host="user@10.111.113.2")

        self.allow_restart_journal_messages()
        self.allow_hostkey_messages()
        # Might happen when killing the bridge.
        self.allow_journal_messages("localhost: dropping message while waiting for child to exit",
                                    "Received message for unknown channel: .*",
                                    ".*: error reading from ssh",
                                    ".*: bridge program failed: Child process exited with code .*",
                                    # Since there is not password,
                                    # reauthorize doesn't work on m2
                                    "received authorize command for wrong user: user",
                                    ".*: user admin reauthorization failed",
                                    "Error executing command as another user: Not authorized",
                                    "This incident has been reported.",
                                    "sudo: a password is required")

    # Possible workaround - ssh as `admin` and just do `m.execute()`
    @skipBrowser("Firefox cannot do `cockpit.spawn`", "firefox")
    def testLockedIdentity(self):
        b = self.browser
        m1 = self.machine
        m2 = self.machine2

        # upload id_ed25519 (password: locked)
        m1.execute("mkdir -p /home/admin/.ssh")
        m1.upload(["verify/files/ssh/id_ed25519", "verify/files/ssh/id_ed25519.pub",
                   "verify/files/ssh/id_rsa", "verify/files/ssh/id_rsa.pub"],
                  "/home/admin/.ssh/")
        m1.write("/home/admin/.ssh/config", """
Host 10.111.113.2
    User user
    IdentityFile /home/admin/.ssh/id_ed25519
""")
        m1.execute("chmod 400 /home/admin/.ssh/*")
        m1.execute("chown -R admin:admin /home/admin/.ssh")
        authorize_user(m2, "user", ["verify/files/ssh/id_ed25519.pub"])

        self.login_and_go("/system")
        b.switch_to_top()

        self.load_key('id_rsa', 'foobar')

        b.click("#hosts-sel button")
        b.click("button:contains('Add new host')")
        b.wait_visible('#hosts_setup_server_dialog')
        b.set_input_text('#add-machine-address', "10.111.113.2")
        b.click("#hosts_setup_server_dialog .pf-m-primary")
        b.wait_in_text("#hosts_setup_server_dialog", "You are connecting to 10.111.113.2 for the first time.")
        b.click("#hosts_setup_server_dialog .pf-m-primary")
        b.wait_in_text("#hosts_setup_server_dialog", "/home/admin/.ssh/id_ed25519")
        b.set_input_text("#locked-identity-password", "locked")
        b.click("#hosts_setup_server_dialog .pf-m-primary")
        b.wait_visible("a[href='/@10.111.113.2']")
        self.allow_hostkey_messages()

    def testLockedDefaultIdentity(self):
        b = self.browser
        m1 = self.machine
        m2 = self.machine2

        # Upload id_rsa and change its password to something else so
        # that it is not automatically loaded into the agent.  id_rsa
        # should be tried autoamtically without needing to configure
        # it explicitly in ~/.ssh/config.

        m1.execute("mkdir -p /home/admin/.ssh")
        m1.upload(["verify/files/ssh/id_rsa", "verify/files/ssh/id_rsa.pub"],
                  "/home/admin/.ssh/")
        m1.execute("chmod 400 /home/admin/.ssh/*")
        m1.execute("chown -R admin:admin /home/admin/.ssh")
        m1.execute("ssh-keygen -p -f /home/admin/.ssh/id_rsa -P foobar -N foobarfoo")
        authorize_user(m2, "admin", ["verify/files/ssh/id_rsa.pub"])

        self.login_and_go("/system")
        b.switch_to_top()

        b.click("#hosts-sel button")
        b.click("button:contains('Add new host')")
        b.wait_visible('#hosts_setup_server_dialog')
        b.set_input_text("#add-machine-address", "10.111.113.2")
        b.click("#hosts_setup_server_dialog .pf-m-primary")
        b.wait_in_text("#hosts_setup_server_dialog", "You are connecting to 10.111.113.2 for the first time.")
        b.click("#hosts_setup_server_dialog .pf-m-primary")
        b.wait_in_text("#hosts_setup_server_dialog", "/home/admin/.ssh/id_rsa")
        b.set_input_text("#locked-identity-password", "foobarfoo")
        b.click("#hosts_setup_server_dialog .pf-m-primary")
        b.wait_visible("a[href='/@10.111.113.2']")
        self.allow_hostkey_messages()


if __name__ == '__main__':
    test_main()
